/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iotdb.db.engine.compaction;

import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CompactionWorker implements Runnable {
    private static final Logger log = LoggerFactory.getLogger("COMPACTION");
    private final int threadId;
    private final FixedPriorityBlockingQueue<AbstractCompactionTask> compactionTaskQueue;

    public CompactionWorker(
            int threadId, FixedPriorityBlockingQueue<AbstractCompactionTask> compactionTaskQueue) {
        this.threadId = threadId;
        this.compactionTaskQueue = compactionTaskQueue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                AbstractCompactionTask task = null;
                try {
                    task = compactionTaskQueue.take();
                } catch (InterruptedException e) {
                    log.warn("CompactionThread-{} terminates because interruption", threadId);
                    return;
                }
                if (task != null) {
                    // add metrics
                    CompactionMetricsRecorder.recordTaskInfo(
                            task, CompactionTaskStatus.POLL_FROM_QUEUE, compactionTaskQueue.size());
                    if (task.checkValidAndSetMerging()) {
                        CompactionTaskSummary summary = task.getSummary();
                        CompactionTaskFuture future = new CompactionTaskFuture(summary);
                        CompactionTaskManager.getInstance().recordTask(task, future);
                        task.start();
                    }
                }
            } catch (Throwable t) {
                log.error("CompactionWorker.run(), Exception.", t);
            }
        }
    }

    static class CompactionTaskFuture implements Future<CompactionTaskSummary> {
        CompactionTaskSummary summary;

        public CompactionTaskFuture(CompactionTaskSummary summary) {
            this.summary = summary;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            summary.cancel();
            return true;
        }

        @Override
        public boolean isCancelled() {
            return summary.isCancel();
        }

        @Override
        public boolean isDone() {
            return summary.isFinished();
        }

        @Override
        public CompactionTaskSummary get() throws InterruptedException, ExecutionException {
            while (!summary.isFinished()) {
                TimeUnit.MILLISECONDS.sleep(100);
            }
            return summary;
        }

        @Override
        public CompactionTaskSummary get(long timeout, @NotNull TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            long perSleepTime = timeout < 100 ? timeout : 100;
            long totalSleepTime = 0L;
            while (!summary.isFinished()) {
                if (totalSleepTime >= timeout) {
                    throw new TimeoutException("Timeout when trying to get compaction task summary");
                }
                unit.sleep(perSleepTime);
                totalSleepTime += perSleepTime;
            }
            return summary;
        }
    }
}
